// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
 * maintain the colocation table related indexes and meta
 */
public class ColocateTableIndex implements Writable {
    private static final Logger LOG = LogManager.getLogger(ColocateTableIndex.class);

    public static class GroupId implements Writable, GsonPostProcessable {
        public static final String GLOBAL_COLOCATE_PREFIX = "__global__";

        @SerializedName(value = "dbId")
        public Long dbId;
        @SerializedName(value = "grpId")
        public Long grpId;
        // only available when dbId = 0
        // because for global colocate table, the dbId is 0, so we do not know which db the table belongs to,
        // so we use tblId2DbId to record the dbId of each table
        @SerializedName(value = "tblId2DbId")
        private Map<Long, Long> tblId2DbId = Maps.newHashMap();

        private GroupId() {
        }

        public GroupId(long dbId, long grpId) {
            this.dbId = dbId;
            this.grpId = grpId;
        }

        public void addTblId2DbId(long tblId, long dbId) {
            Preconditions.checkState(this.dbId == 0);
            tblId2DbId.put(tblId, dbId);
        }

        public void removeTblId2DbId(long tblId) {
            tblId2DbId.remove(tblId);
        }

        public long getDbIdByTblId(long tblId) {
            return tblId2DbId.get(tblId);
        }

        public int getTblId2DbIdSize() {
            return tblId2DbId.size();
        }

        public static GroupId read(DataInput in) throws IOException {
            String json = Text.readString(in);
            return GsonUtils.GSON.fromJson(json, GroupId.class);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            Text.writeString(out, GsonUtils.GSON.toJson(this));
        }

        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof GroupId)) {
                return false;
            }
            GroupId other = (GroupId) obj;
            return dbId.equals(other.dbId) && grpId.equals(other.grpId);
        }

        @Override
        public void gsonPostProcess() throws IOException {
            if (tblId2DbId == null) {
                tblId2DbId = Maps.newHashMap();
            }
        }

        @Override
        public int hashCode() {
            int result = 17;
            result = 31 * result + dbId.hashCode();
            result = 31 * result + grpId.hashCode();
            return result;
        }

        @Override
        public String toString() {
            return dbId + "." + grpId;
        }

        public static String getFullGroupName(long dbId, String colocateGroup) {
            if (colocateGroup.startsWith(GLOBAL_COLOCATE_PREFIX)) {
                return colocateGroup;
            } else {
                return dbId + "_" + colocateGroup;
            }
        }

        public static boolean isGlobalGroupName(String groupName) {
            return groupName.startsWith(GLOBAL_COLOCATE_PREFIX);
        }
    }

    // group_name -> group_id
    @SerializedName(value = "groupName2Id")
    private Map<String, GroupId> groupName2Id = Maps.newHashMap();
    // group_id -> table_ids
    @SerializedName(value = "group2Tables")
    private Multimap<GroupId, Long> group2Tables = ArrayListMultimap.create();
    // table_id -> group_id
    @SerializedName(value = "table2Group")
    private ConcurrentMap<Long, GroupId> table2Group = Maps.newConcurrentMap();
    // group id -> group schema
    @SerializedName(value = "group2Schema")
    private Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
    // group_id -> bucketSeq -> backend ids
    @SerializedName(value = "group2BackendsPerBucketSeq")
    private Table<GroupId, Tag, List<List<Long>>> group2BackendsPerBucketSeq = HashBasedTable.create();
    // the colocate group is unstable
    @SerializedName(value = "unstableGroups")
    private Set<GroupId> unstableGroups = Sets.newHashSet();
    // save some error msg of the group for show. no need to persist
    @SerializedName(value = "group2ErrMsgs")
    private Map<GroupId, String> group2ErrMsgs = Maps.newHashMap();

    private transient MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock();

    public ColocateTableIndex() {

    }

    private void readLock() {
        this.lock.readLock().lock();
    }

    private void readUnlock() {
        this.lock.readLock().unlock();
    }

    private void writeLock() {
        this.lock.writeLock().lock();
    }

    private void writeUnlock() {
        this.lock.writeLock().unlock();
    }

    // NOTICE: call 'addTableToGroup()' will not modify 'group2BackendsPerBucketSeq'
    // 'group2BackendsPerBucketSeq' need to be set manually before or after, if necessary.
    public GroupId addTableToGroup(long dbId, OlapTable tbl, String fullGroupName, GroupId assignedGroupId) {
        writeLock();
        try {
            GroupId groupId = null;
            if (groupName2Id.containsKey(fullGroupName)) {
                groupId = groupName2Id.get(fullGroupName);
            } else {
                if (assignedGroupId != null) {
                    // use the given group id, eg, in replay process
                    groupId = assignedGroupId;
                } else {
                    // generate a new one
                    if (GroupId.isGlobalGroupName(fullGroupName)) {
                        groupId = new GroupId(0, Env.getCurrentEnv().getNextId());
                    } else {
                        groupId = new GroupId(dbId, Env.getCurrentEnv().getNextId());
                    }
                }
                HashDistributionInfo distributionInfo = (HashDistributionInfo) tbl.getDefaultDistributionInfo();
                ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId,
                        distributionInfo.getDistributionColumns(), distributionInfo.getBucketNum(),
                        tbl.getDefaultReplicaAllocation());
                groupName2Id.put(fullGroupName, groupId);
                group2Schema.put(groupId, groupSchema);
                group2ErrMsgs.put(groupId, "");
            }
            // for global colocate table, dbId is 0, and we need to save the real dbId of the table
            if (groupId.dbId == 0) {
                groupId.addTblId2DbId(tbl.getId(), dbId);
            }
            group2Tables.put(groupId, tbl.getId());
            table2Group.put(tbl.getId(), groupId);
            return groupId;
        } finally {
            writeUnlock();
        }
    }

    public void addBackendsPerBucketSeq(GroupId groupId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
        writeLock();
        try {
            for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
                group2BackendsPerBucketSeq.put(groupId, entry.getKey(), entry.getValue());
            }
        } finally {
            writeUnlock();
        }
    }

    public void setBackendsPerBucketSeq(GroupId groupId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
        writeLock();
        try {
            Map<Tag, List<List<Long>>> backendsPerBucketSeqMap = group2BackendsPerBucketSeq.row(groupId);
            if (backendsPerBucketSeqMap != null) {
                backendsPerBucketSeqMap.clear();
            }
            for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
                group2BackendsPerBucketSeq.put(groupId, entry.getKey(), entry.getValue());
            }
        } finally {
            writeUnlock();
        }
    }

    public boolean addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag, List<List<Long>> backendsPerBucketSeq,
            ReplicaAllocation originReplicaAlloc) {
        writeLock();
        try {
            ColocateGroupSchema groupSchema = group2Schema.get(groupId);
            // replica allocation has outdate
            if (groupSchema != null && !originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
                LOG.info("replica allocation has outdate for group {}, old replica alloc {}, new replica alloc {}",
                        groupId, originReplicaAlloc.getAllocMap(), groupSchema.getReplicaAlloc());
                return false;
            }
            group2BackendsPerBucketSeq.put(groupId, tag, backendsPerBucketSeq);
            return true;
        } finally {
            writeUnlock();
        }
    }

    public void markGroupUnstable(GroupId groupId, String reason, boolean needEditLog) {
        writeLock();
        try {
            if (!group2Tables.containsKey(groupId)) {
                return;
            }
            if (unstableGroups.add(groupId)) {
                group2ErrMsgs.put(groupId, Strings.nullToEmpty(reason));
                if (needEditLog) {
                    ColocatePersistInfo info = ColocatePersistInfo.createForMarkUnstable(groupId);
                    Env.getCurrentEnv().getEditLog().logColocateMarkUnstable(info);
                }
                LOG.info("mark group {} as unstable", groupId);
            }
        } finally {
            writeUnlock();
        }
    }

    public void markGroupStable(GroupId groupId, boolean needEditLog, ReplicaAllocation originReplicaAlloc) {
        writeLock();
        try {
            if (!group2Tables.containsKey(groupId)) {
                return;
            }
            // replica allocation is outdate
            ColocateGroupSchema groupSchema = group2Schema.get(groupId);
            if (groupSchema != null && originReplicaAlloc != null
                    && !originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
                LOG.warn("mark group {} failed, replica alloc has outdate, old replica alloc {}, new replica alloc {}",
                        groupId, originReplicaAlloc.getAllocMap(), groupSchema.getReplicaAlloc());
                return;
            }
            if (unstableGroups.remove(groupId)) {
                group2ErrMsgs.put(groupId, "");
                if (needEditLog) {
                    ColocatePersistInfo info = ColocatePersistInfo.createForMarkStable(groupId);
                    Env.getCurrentEnv().getEditLog().logColocateMarkStable(info);
                }
                LOG.info("mark group {} as stable", groupId);
            }
        } finally {
            writeUnlock();
        }
    }

    public boolean removeTable(long tableId) {
        writeLock();
        try {
            if (!table2Group.containsKey(tableId)) {
                return false;
            }

            GroupId groupId = table2Group.remove(tableId);
            groupId.removeTblId2DbId(tableId);
            group2Tables.remove(groupId, tableId);
            if (!group2Tables.containsKey(groupId)) {
                // all tables of this group are removed, remove the group
                group2BackendsPerBucketSeq.rowMap().remove(groupId);
                group2Schema.remove(groupId);
                group2ErrMsgs.remove(groupId);
                unstableGroups.remove(groupId);
                String fullGroupName = null;
                for (Map.Entry<String, GroupId> entry : groupName2Id.entrySet()) {
                    if (entry.getValue().equals(groupId)) {
                        fullGroupName = entry.getKey();
                        break;
                    }
                }
                if (fullGroupName != null) {
                    groupName2Id.remove(fullGroupName);
                }
            }
        } finally {
            writeUnlock();
        }

        return true;
    }

    public boolean isGroupUnstable(GroupId groupId) {
        readLock();
        try {
            return unstableGroups.contains(groupId);
        } finally {
            readUnlock();
        }
    }

    // ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic,
    // If the FE concurrency is high, the CPU may be fully loaded, so try not to lock it here
    // table2Group is ConcurrentHashMap
    public boolean isColocateTableNoLock(long tableId) {
        return table2Group.containsKey(tableId);
    }

    public boolean isColocateTable(long tableId) {
        readLock();
        try {
            return table2Group.containsKey(tableId);
        } finally {
            readUnlock();
        }
    }

    public boolean isGroupExist(GroupId groupId) {
        readLock();
        try {
            return group2Schema.containsKey(groupId);
        } finally {
            readUnlock();
        }
    }

    public boolean isSameGroup(long table1, long table2) {
        readLock();
        try {
            if (table2Group.containsKey(table1) && table2Group.containsKey(table2)) {
                return table2Group.get(table1).equals(table2Group.get(table2));
            }
            return false;
        } finally {
            readUnlock();
        }
    }

    public Set<GroupId> getUnstableGroupIds() {
        readLock();
        try {
            return Sets.newHashSet(unstableGroups);
        } finally {
            readUnlock();
        }
    }

    // ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic,
    // If the FE concurrency is high, the CPU may be fully loaded, so try not to lock it here
    // table2Group is ConcurrentHashMap
    public GroupId getGroupNoLock(long tableId) {
        Preconditions.checkState(table2Group.containsKey(tableId));
        return table2Group.get(tableId);
    }

    public GroupId getGroup(long tableId) {
        readLock();
        try {
            Preconditions.checkState(table2Group.containsKey(tableId));
            return table2Group.get(tableId);
        } finally {
            readUnlock();
        }
    }

    public Set<GroupId> getAllGroupIds() {
        readLock();
        try {
            return Sets.newHashSet(group2Tables.keySet());
        } finally {
            readUnlock();
        }
    }

    public Set<Long> getBackendsByGroup(GroupId groupId, Tag tag) {
        readLock();
        try {
            Set<Long> allBackends = new HashSet<>();
            List<List<Long>> backendsPerBucketSeq = group2BackendsPerBucketSeq.get(groupId, tag);
            // if create colocate table with empty partition or create colocate table
            // with dynamic_partition will cause backendsPerBucketSeq == null
            if (backendsPerBucketSeq != null) {
                for (List<Long> bes : backendsPerBucketSeq) {
                    allBackends.addAll(bes);
                }
            }
            return allBackends;
        } finally {
            readUnlock();
        }
    }

    public List<Long> getAllTableIds(GroupId groupId) {
        readLock();
        try {
            if (!group2Tables.containsKey(groupId)) {
                return Lists.newArrayList();
            }
            return Lists.newArrayList(group2Tables.get(groupId));
        } finally {
            readUnlock();
        }
    }

    public Map<Tag, List<List<Long>>> getBackendsPerBucketSeq(GroupId groupId) {
        readLock();
        try {
            Map<Tag, List<List<Long>>> backendsPerBucketSeq = group2BackendsPerBucketSeq.row(groupId);
            if (backendsPerBucketSeq == null) {
                return Maps.newHashMap();
            }
            return backendsPerBucketSeq;
        } finally {
            readUnlock();
        }
    }

    public List<List<Long>> getBackendsPerBucketSeqByTag(GroupId groupId, Tag tag) {
        readLock();
        try {
            List<List<Long>> backendsPerBucketSeq = group2BackendsPerBucketSeq.get(groupId, tag);
            if (backendsPerBucketSeq == null) {
                return Lists.newArrayList();
            }
            return backendsPerBucketSeq;
        } finally {
            readUnlock();
        }
    }

    // Get all backend ids except for the given tag
    public Set<Long> getBackendIdsExceptForTag(GroupId groupId, Tag tag) {
        Set<Long> beIds = Sets.newHashSet();
        readLock();
        try {
            Map<Tag, List<List<Long>>> backendsPerBucketSeq = group2BackendsPerBucketSeq.row(groupId);
            if (backendsPerBucketSeq == null) {
                return beIds;
            }

            for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
                if (entry.getKey().equals(tag)) {
                    continue;
                }
                for (List<Long> list : entry.getValue()) {
                    beIds.addAll(list);
                }
            }
            return beIds;
        } finally {
            readUnlock();
        }
    }


    public List<Set<Long>> getBackendsPerBucketSeqSet(GroupId groupId) {
        readLock();
        try {
            Map<Tag, List<List<Long>>> backendsPerBucketSeqMap = group2BackendsPerBucketSeq.row(groupId);
            if (backendsPerBucketSeqMap == null) {
                return Lists.newArrayList();
            }
            List<Set<Long>> list = Lists.newArrayList();

            // Merge backend ids of all tags
            for (Map.Entry<Tag, List<List<Long>>> backendsPerBucketSeq : backendsPerBucketSeqMap.entrySet()) {
                for (int i = 0; i < backendsPerBucketSeq.getValue().size(); ++i) {
                    if (list.size() == i) {
                        list.add(Sets.newHashSet());
                    }
                    list.get(i).addAll(backendsPerBucketSeq.getValue().get(i));
                }
            }
            return list;
        } finally {
            readUnlock();
        }
    }

    public Set<Long> getTabletBackendsByGroup(GroupId groupId, int tabletOrderIdx) {
        readLock();
        try {
            Map<Tag, List<List<Long>>> backendsPerBucketSeqMap = group2BackendsPerBucketSeq.row(groupId);
            if (backendsPerBucketSeqMap == null) {
                return Sets.newHashSet();
            }

            // Merge backend ids of all tags
            Set<Long> beIds = Sets.newHashSet();
            for (Map.Entry<Tag, List<List<Long>>> backendsPerBucketSeq : backendsPerBucketSeqMap.entrySet()) {
                if (tabletOrderIdx >= backendsPerBucketSeq.getValue().size()) {
                    return Sets.newHashSet();
                }
                beIds.addAll(backendsPerBucketSeq.getValue().get(tabletOrderIdx));
            }

            return beIds;
        } finally {
            readUnlock();
        }
    }

    public ColocateGroupSchema getGroupSchema(String fullGroupName) {
        readLock();
        try {
            if (!groupName2Id.containsKey(fullGroupName)) {
                return null;
            }
            return group2Schema.get(groupName2Id.get(fullGroupName));
        } finally {
            readUnlock();
        }
    }

    public ColocateGroupSchema getGroupSchema(GroupId groupId) {
        readLock();
        try {
            return group2Schema.get(groupId);
        } finally {
            readUnlock();
        }
    }

    public long getTableIdByGroup(String fullGroupName) {
        readLock();
        try {
            if (groupName2Id.containsKey(fullGroupName)) {
                GroupId groupId = groupName2Id.get(fullGroupName);
                Optional<Long> tblId = group2Tables.get(groupId).stream().findFirst();
                return tblId.isPresent() ? tblId.get() : -1;
            }
        } finally {
            readUnlock();
        }
        return -1;
    }

    public GroupId changeGroup(long dbId, OlapTable tbl, String oldGroup, String newGroup, GroupId assignedGroupId) {
        writeLock();
        try {
            if (!Strings.isNullOrEmpty(oldGroup)) {
                // remove from old group
                removeTable(tbl.getId());
            }
            String fullNewGroupName = GroupId.getFullGroupName(dbId, newGroup);
            return addTableToGroup(dbId, tbl, fullNewGroupName, assignedGroupId);
        } finally {
            writeUnlock();
        }
    }

    public void replayAddTableToGroup(ColocatePersistInfo info) throws MetaNotFoundException {
        long dbId = info.getGroupId().dbId;
        if (dbId == 0) {
            dbId = info.getGroupId().getDbIdByTblId(info.getTableId());
        }
        Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
        OlapTable tbl = (OlapTable) db.getTableOrMetaException(info.getTableId(),
                org.apache.doris.catalog.Table.TableType.OLAP);
        writeLock();
        try {
            Map<Tag, List<List<Long>>> map = info.getBackendsPerBucketSeq();
            for (Map.Entry<Tag, List<List<Long>>> entry : map.entrySet()) {
                group2BackendsPerBucketSeq.put(info.getGroupId(), entry.getKey(), entry.getValue());
            }
            String fullGroupName = GroupId.getFullGroupName(dbId, tbl.getColocateGroup());
            addTableToGroup(dbId, tbl, fullGroupName, info.getGroupId());
        } finally {
            writeUnlock();
        }
    }

    public void replayAddBackendsPerBucketSeq(ColocatePersistInfo info) {
        addBackendsPerBucketSeq(info.getGroupId(), info.getBackendsPerBucketSeq());
    }

    public void replayMarkGroupUnstable(ColocatePersistInfo info) {
        markGroupUnstable(info.getGroupId(), "replay mark group unstable", false);
    }

    public void replayMarkGroupStable(ColocatePersistInfo info) {
        markGroupStable(info.getGroupId(), false, null);
    }

    public void replayRemoveTable(ColocatePersistInfo info) {
        removeTable(info.getTableId());
    }

    public void replayModifyReplicaAlloc(ColocatePersistInfo info) throws UserException {
        writeLock();
        try {
            modifyColocateGroupReplicaAllocation(info.getGroupId(), info.getReplicaAlloc(),
                    info.getBackendsPerBucketSeq(), false);
        } finally {
            writeUnlock();
        }
    }

    // only for test
    public void clear() {
        writeLock();
        try {
            group2Tables.clear();
            table2Group.clear();
            group2BackendsPerBucketSeq.clear();
            group2Schema.clear();
            unstableGroups.clear();
        } finally {
            writeUnlock();
        }
    }

    public List<List<String>> getInfos() {
        List<List<String>> infos = Lists.newArrayList();
        readLock();
        try {
            for (Map.Entry<String, GroupId> entry : groupName2Id.entrySet()) {
                List<String> info = Lists.newArrayList();
                GroupId groupId = entry.getValue();
                info.add(groupId.toString());
                String dbName = "";
                if (groupId.dbId != 0) {
                    Database db = Env.getCurrentInternalCatalog().getDbNullable(groupId.dbId);
                    if (db != null) {
                        dbName = db.getFullName();
                        int index = dbName.indexOf(":");
                        if (index > 0) {
                            dbName = dbName.substring(index + 1); //use short db name
                        }
                    }
                }
                String groupName = entry.getKey();
                if (!GroupId.isGlobalGroupName(groupName)) {
                    groupName = dbName + "." + groupName.substring(groupName.indexOf("_") + 1);
                }
                info.add(groupName);
                info.add(Joiner.on(", ").join(group2Tables.get(groupId)));
                ColocateGroupSchema groupSchema = group2Schema.get(groupId);
                info.add(String.valueOf(groupSchema.getBucketsNum()));
                info.add(String.valueOf(groupSchema.getReplicaAlloc().toCreateStmt()));
                List<String> cols = groupSchema.getDistributionColTypes().stream().map(
                        e -> e.toSql()).collect(Collectors.toList());
                info.add(Joiner.on(", ").join(cols));
                info.add(String.valueOf(!unstableGroups.contains(groupId)));
                info.add(Strings.nullToEmpty(group2ErrMsgs.get(groupId)));
                infos.add(info);
            }
        } finally {
            readUnlock();
        }
        return infos;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        writeLock();
        try {
            int size = groupName2Id.size();
            out.writeInt(size);
            for (Map.Entry<String, GroupId> entry : ImmutableMap.copyOf(groupName2Id).entrySet()) {
                Text.writeString(out, entry.getKey()); // group name
                entry.getValue().write(out); // group id
                Collection<Long> tableIds = group2Tables.get(entry.getValue());
                out.writeInt(tableIds.size());
                for (Long tblId : tableIds) {
                    out.writeLong(tblId); // table ids
                }
                ColocateGroupSchema groupSchema = group2Schema.get(entry.getValue());
                groupSchema.write(out); // group schema

                // backend seq
                Map<Tag, List<List<Long>>> backendsPerBucketSeq = group2BackendsPerBucketSeq.row(entry.getValue());
                out.writeInt(backendsPerBucketSeq.size());
                for (Map.Entry<Tag, List<List<Long>>> tag2Bucket2BEs : backendsPerBucketSeq.entrySet()) {
                    tag2Bucket2BEs.getKey().write(out);
                    out.writeInt(tag2Bucket2BEs.getValue().size());
                    for (List<Long> beIds : tag2Bucket2BEs.getValue()) {
                        out.writeInt(beIds.size());
                        for (Long be : beIds) {
                            out.writeLong(be);
                        }
                    }
                }
            }

            size = unstableGroups.size();
            out.writeInt(size);
            for (GroupId groupId : unstableGroups) {
                groupId.write(out);
            }
        } finally {
            writeUnlock();
        }

    }

    public void readFields(DataInput in) throws IOException {
        int size = in.readInt();
        for (int i = 0; i < size; i++) {
            String fullGrpName = Text.readString(in);
            GroupId grpId = GroupId.read(in);
            groupName2Id.put(fullGrpName, grpId);
            int tableSize = in.readInt();
            for (int j = 0; j < tableSize; j++) {
                long tblId = in.readLong();
                group2Tables.put(grpId, tblId);
                table2Group.put(tblId, grpId);
            }
            ColocateGroupSchema groupSchema = ColocateGroupSchema.read(in);
            group2Schema.put(grpId, groupSchema);

            // backends seqs
            int tagSize = in.readInt();
            for (int j = 0; j < tagSize; j++) {
                Tag tag = Tag.read(in);
                int bucketSize = in.readInt();
                List<List<Long>> bucketsSeq = Lists.newArrayList();
                for (int k = 0; k < bucketSize; k++) {
                    List<Long> beIds = Lists.newArrayList();
                    int beSize = in.readInt();
                    for (int l = 0; l < beSize; l++) {
                        beIds.add(in.readLong());
                    }
                    bucketsSeq.add(beIds);
                }
                group2BackendsPerBucketSeq.put(grpId, tag, bucketsSeq);
            }
        }

        size = in.readInt();
        for (int i = 0; i < size; i++) {
            unstableGroups.add(GroupId.read(in));
        }
    }

    public void setErrMsgForGroup(GroupId groupId, String message) {
        group2ErrMsgs.put(groupId, message);
    }

    // just for ut
    public Map<Long, GroupId> getTable2Group() {
        return table2Group;
    }

    public void alterColocateGroup(AlterColocateGroupCommand command) throws UserException {
        writeLock();
        try {
            Map<String, String> properties = command.getProperties();
            String dbName = command.getColocateGroupName().getDb();
            String groupName = command.getColocateGroupName().getGroup();
            long dbId = 0;
            if (!GroupId.isGlobalGroupName(groupName)) {
                Database db = (Database) Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);
                dbId = db.getId();
            }
            String fullGroupName = GroupId.getFullGroupName(dbId, groupName);
            ColocateGroupSchema groupSchema = getGroupSchema(fullGroupName);
            if (groupSchema == null) {
                throw new DdlException("Not found colocate group " + command.getColocateGroupName().toSql());
            }

            GroupId groupId = groupSchema.getGroupId();

            if (properties.size() > 1) {
                throw new DdlException("Can only set one colocate group property at a time");
            }

            if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)
                    || properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
                if (Config.isCloudMode()) {
                    throw new DdlException("Cann't modify colocate group replication in cloud mode");
                }

                ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
                Preconditions.checkState(!replicaAlloc.isNotSet());
                Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
                Map<Tag, List<List<Long>>> backendsPerBucketSeq = getBackendsPerBucketSeq(groupId);
                Map<Tag, List<List<Long>>> newBackendsPerBucketSeq = Maps.newHashMap();
                for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) {
                    List<List<Long>> newList = Lists.newArrayList();
                    for (List<Long> backends : entry.getValue()) {
                        newList.add(Lists.newArrayList(backends));
                    }
                    newBackendsPerBucketSeq.put(entry.getKey(), newList);
                }
                try {
                    ColocateTableCheckerAndBalancer.modifyGroupReplicaAllocation(replicaAlloc,
                            newBackendsPerBucketSeq, groupSchema.getBucketsNum());
                } catch (Exception e) {
                    LOG.warn("modify group [{}, {}] to replication allocation {} failed, bucket seq {}",
                            fullGroupName, groupId, replicaAlloc, backendsPerBucketSeq, e);
                    throw new DdlException(e.getMessage());
                }
                backendsPerBucketSeq = newBackendsPerBucketSeq;
                Preconditions.checkState(backendsPerBucketSeq.size() == replicaAlloc.getAllocMap().size());
                modifyColocateGroupReplicaAllocation(groupSchema.getGroupId(), replicaAlloc,
                        backendsPerBucketSeq, true);
            } else {
                throw new DdlException("Unknown colocate group property: " + properties.keySet());
            }
        } finally {
            writeUnlock();
        }
    }

    private void modifyColocateGroupReplicaAllocation(GroupId groupId, ReplicaAllocation replicaAlloc,
            Map<Tag, List<List<Long>>> backendsPerBucketSeq, boolean isReplay) throws UserException {
        ColocateGroupSchema groupSchema = getGroupSchema(groupId);
        if (groupSchema == null) {
            LOG.warn("not found group {}", groupId);
            return;
        }

        List<Long> tableIds = getAllTableIds(groupId);
        for (Long tableId : tableIds) {
            long dbId = groupId.dbId;
            if (dbId == 0) {
                dbId = groupId.getDbIdByTblId(tableId);
            }
            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }
            OlapTable table = (OlapTable) db.getTableNullable(tableId);
            if (table == null || !isColocateTable(table.getId())) {
                continue;
            }
            table.writeLock();
            try {
                Map<String, String> tblProperties = Maps.newHashMap();
                tblProperties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
                        replicaAlloc.toCreateStmt());
                table.setReplicaAllocation(tblProperties);
                if (table.dynamicPartitionExists()) {
                    TableProperty tableProperty = table.getTableProperty();
                    // Merge the new properties with origin properties, and then analyze them
                    Map<String, String> origDynamicProperties = tableProperty.getOriginDynamicPartitionProperty();
                    origDynamicProperties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION,
                            replicaAlloc.toCreateStmt());
                    Map<String, String> analyzedDynamicPartition = DynamicPartitionUtil.analyzeDynamicPartition(
                            origDynamicProperties, table, db, isReplay);
                    tableProperty.modifyTableProperties(analyzedDynamicPartition);
                    tableProperty.buildDynamicProperty();
                }
                for (ReplicaAllocation alloc : table.getPartitionInfo().getPartitionReplicaAllocations().values()) {
                    Map<Tag, Short> allocMap = alloc.getAllocMap();
                    allocMap.clear();
                    allocMap.putAll(replicaAlloc.getAllocMap());
                }
            } finally {
                table.writeUnlock();
            }
        }

        if (!backendsPerBucketSeq.equals(group2BackendsPerBucketSeq.row(groupId))) {
            markGroupUnstable(groupId, "change replica allocation", false);
        }
        groupSchema.setReplicaAlloc(replicaAlloc);
        setBackendsPerBucketSeq(groupId, backendsPerBucketSeq);

        if (!isReplay) {
            ColocatePersistInfo info = ColocatePersistInfo.createForModifyReplicaAlloc(groupId,
                    replicaAlloc, backendsPerBucketSeq);
            Env.getCurrentEnv().getEditLog().logColocateModifyRepliaAlloc(info);
        }
        LOG.info("modify group {} replication allocation to {}, is replay {}", groupId, replicaAlloc, isReplay);
    }
}
